package com.kafka.cn.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * @author: yangShen
 * @Description:
 * @Date: 2020/3/31 14:54
 */
public class PartitionProducer {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置信息
        Properties properties = new Properties();
        // 2.指定kafka连接的集群，broker-list
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.16.26.16:9092");
        // 3.ACK应答级别
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        // 4.重试次数
        properties.put("retries", 3);
        // 5.批次大小：16k
        properties.put("batch.size", 16384);
        // 6.等待时间,超过1ms会自动断开连接
        properties.put("linger.ms", 1);
        // 7.RecordAccumulator 缓冲区大小：32M
        properties.put("buffer.memory", 33554432);
        // 8.key,value序列化类
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.cn.partitioner.MyPartititoner");

        // 9.创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //10.发送数据：鼠标放在编译错误处 ctrl+p
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", "atguigu---" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (null == exception){
                        System.out.println(metadata.partition()+"---"+metadata.offset());
                    }else {
                        exception.printStackTrace();
                    }
                }
            });
        }

        // 11.关闭资源，如果不关闭，等待时间没有超过1ms，消费者会接收不到数据
        producer.close();
    }
}
